/*******************************************************************************
 * Package: com.song.bigdata.stream
 * Type:    MyTransformationMap
 * Date:    2022-10-28 16:14
 *
 * Copyright (c) 2022 HUANENG GUICHENG TRUST CORP.,LTD All Rights Reserved.
 *
 * You may not use this file except in compliance with the License.
 *******************************************************************************/
package com.song.bigdata.stream;

import com.song.bigdata.stream.pojo.User;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


/**
 * 功能描述： 转换算子
 *“一一映射”，消费一个元素就产出一个元素
 * 输入什么就输出什么  我们可以在 将数据转换后输出
 * @author Songxianyang
 * @date 2022-10-28 16:14
 */
public class MyTransformationMap {
    public static void main(String[] args) throws Exception {
        // 创建环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<User> streamSource = environment.fromElements(
                new User("song-user", 12), new User("xian-user", 92)
        );
        SingleOutputStreamOperator<User> filter = streamSource.filter(user -> user.age == 12);
        // 数据操作 吧 名字输出
        streamSource.map(new MyMapFunction()).print();
        //
        streamSource.flatMap(new MyFlatMap()).print();
        //// 在streamSource 中找到 年龄
        //streamSource.map(new MapFunction<User, User>() {
        //
        //    @Override
        //    public User map(User user) throws Exception {
        //        user.age=user.age+22;
        //        return user;
        //    }
        //});
        //source.print();
        System.out.println("------------------>");
        filter.print();
        environment.execute();
    }
}

/**
 * 一对一的操作
 * User进入的对象
 * String 要返回的对象
 */
class MyMapFunction implements MapFunction<User ,String> {

    @Override
    public String map(User user) throws Exception {
        return user.name;
    }
}
/**
 * User进入的对象
 * String 要返回的对象  将数据拆分  一个一个的输出
 */
class MyFlatMap implements FlatMapFunction <User ,String>{

    @Override
    public void flatMap(User user, Collector<String> collector) throws Exception {
        if (user.age.equals(12)) {
            // 把符合条件的数据输出出来
            collector.collect(user.name);
        }
    }
}

